草庐IT

flink 流批

全部标签

Flink oracle cdc - Oracle Logminer CDC性能问题

最近的项目中有用到Flink Oracle CDC实时到监听数据库变化,将变化的数据sink到Kafka。Oracle CDC依赖Debezium组件解析Redo Log与Archive Log,Debezium 通过Oracle 的Logminer解析Log。在我们生产环境遇到运行一段时间后,再也查询不到数据,直到报miss log file异常(线上环境cron job 将一小时前的archvied log压缩生成gzip文件),Flink job运行失败。日志量比较大的时候,延迟非常大,每小时archived log size超过60G时延迟去到小时级别。分析问题前,先简单介绍下Orac

使用Java代码远程提交flink任务

publicclassFlinkTask{privateStringJobManagerAddress="xxxx";publicJobIDrunTask(StringjarPath,intparallelism,StringentryPointClassName){RestClusterClientclient=null;JobIDjobId=null;try{//集群信息Configurationconfiguration=newConfiguration();configuration.setString(JobManagerOptions.ADDRESS,JobManagerAddre

Flink中max和maxBy的区别及使用

在Flink中max算子和maxBy算子都是用来求取最大值的,下面将结合代码介绍一下它俩的相同点和不同点相同点都是滚动聚合都会根据代码的逻辑更新状态中记录的聚合值,并输出不同点max算子只会更新最大值的字段,maxBy算子会更新整条数据,下面就结合代码看和结果看一下相同点及区别测试数据小明,M,25小花,W,27小美,W,29小强,M,24小刚,M,29小A,M,25小B,W,27小C,W,29小D,M,24小E,M,29max算子publicstaticvoidmain(String[]args)throwsException{//创建流处理环境StreamExecutionEnvironm

flink1.17.0 集成kafka,并且计算

前言flink是实时计算的重要集成组件,这里演示如何集成,并且使用一个小例子。例子是kafka输入消息,用逗号隔开,统计每个相同单词出现的次数,这么一个功能。一、kafka环境准备1.1启动kafka这里我使用的kafka版本是3.2.0,部署的方法可以参考,kafka部署cdkafka_2.13-3.2.0bin/zookeeper-server-start.shconfig/zookeeper.propertiesbin/kafka-server-start.shconfig/server.properties启动后查看java进程是否存在,存在后执行下一步。1.2新建topic新建一个专

亲测有效:flink上传jar包出现Server Response Message:Internal server error的解决办法

一:分析是什么1.先摆问题:Flink平台SubmitNewJob中上传Mavenpackage打包后的jar包,填上全类名,提交后出现ServerResponseMessage:Internalservererror错误2.查看Log报错,发现是第一张截图中的第二步中填写的全类名在jar包中找不到3.于是可以初步判定不是虚拟机上Flink的配置错误,而是上传的jar包出了问题。通过笔者查找了半天,终于发现了问题所在,在WordCount下的target文件夹中的classes文件夹,Maven只编译了maven项目中的src/main/java中的App,没有编译scala中的主类二:分析为

亲测有效:flink上传jar包出现Server Response Message:Internal server error的解决办法

一:分析是什么1.先摆问题:Flink平台SubmitNewJob中上传Mavenpackage打包后的jar包,填上全类名,提交后出现ServerResponseMessage:Internalservererror错误2.查看Log报错,发现是第一张截图中的第二步中填写的全类名在jar包中找不到3.于是可以初步判定不是虚拟机上Flink的配置错误,而是上传的jar包出了问题。通过笔者查找了半天,终于发现了问题所在,在WordCount下的target文件夹中的classes文件夹,Maven只编译了maven项目中的src/main/java中的App,没有编译scala中的主类二:分析为

Flink配置Yarn日志聚合、配置历史日志。

Flink配置Yarn日志聚合、配置历史日志对于已经结束的yarn应用,flink进程已经退出无法提供webui服务。所以需要通过JobHistoryServer查看保留在yarn上的日志。下面就给大家分享一下我在配置方面的经历吧。1.yarn配置聚合日志编辑:yarn-site.xml说明:开启后任务执行“完毕”后,才会上传日志至hdfs查询:yarnlogs-applicationIdapplication_1546250639760_0055配置:property>name>yarn.log-aggregation.retain-secondsname>value>10080value>

Flink预加载分区维表,实时更新维表配置信息

当前我们的业务场景,是基于dataStream代码,维表数据量很大,实时性要求很高,所以采用预加载分区维表模式,kafka广播流实时更新配置。主题:调研预加载分区维表模式业务特点:维表配置数据量很大,实时性要求很高当前业务场景介绍:当前Flink基于dataStream代码编写,每个并行度process的open方法加载全量配置数据保存当前瓶颈点:无法应对超大维表。生产环境维表的配置数据量很大,如果每个并行度都去采用全量的配置会消耗很多内存,同时也会很耗时;有可能加载时间会超过checkpoint设置的timeout时间,导致整个Flinkjob都起不来,出现Down的情况。预加载分区维表优点

【Flink】SpringBoot整合Flink并以集群方式运行,可以通过接口来动态创建执行任务,并行度可通过接口动态配置,可以和业务进行交互,灵活性极强,扩展性极高

查阅无数资料,爬了无数个坑!!!整体思路:把SpringBoot当成一个任务放进Flink集群中运行,并且该任务会一直运行,当其他任务需要执行时只需要调用SpringBoot的接口来动态生成任务,可以把每一个接口都当成一个任务,调用接口时Flink会根据当前环境动态创建任务并执行注意事项: 使用./flinkrun命令以后台运行的方式去运行打好的jar包一、引入以下依赖881.81.13.02.121.7.305.1.472.0.3.RELEASEorg.springframework.bootspring-boot-starter-parent2.1.1.RELEASEorg.springf

Flink系列-1、流式计算简介

版权声明:本文为博主原创文章,遵循CC4.0BY-SA版权协议,转载请附上原文出处链接和本声明。大数据系列文章目录官方网址:https://flink.apache.org/学习资料:https://flink-learning.org.cn/目录数据的时效性流式计算和批量计算流式计算流程和特性分布式计算引擎什么是FlinkFlink中的批和流性能比较Flink流处理特性发展历史Flink在阿里的现状Flink的优势Flink的应用场景国内使用情况数据的时效性日常工作中,我们一般会先把数据存储在表,然后对表的数据进行加工、分析。既然先存储在表中,那就会涉及到时效性概念。如果我们处理以年,月为单